#include "config.h"

#include <sys/types.h>
#include <regex.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <pthread.h>
#include <signal.h>
#include <sys/vfs.h>
#include <unistd.h>
#include <getopt.h>
#include <sys/statvfs.h>

#define DBG_SUBSYS S_LIBCLUSTER

#include "sysy_lib.h"
#include "env.h"
#include "configure.h"
#include "lichconf.h"
#include "job_dock.h"
#include "minirpc.h"
#include "rpc_proto.h"
#include "metadata.h"
#include "ynet_rpc.h"
#include "net_global.h"
#include "node.h"
#include "ylog.h"
#include "dbg.h"

typedef struct {
        uint32_t op;
        uint32_t force;
        char uuid[UUID_LEN];
        uint64_t offset;
        char name[0];
} node_admin_add_req_t;

typedef node_admin_add_req_t node_admin_getlist_req_t;
typedef node_admin_add_req_t node_addnode_req_t;
typedef node_admin_add_req_t node_admin_setmeta_req_t;
typedef node_admin_add_req_t node_admin_dropmeta_req_t;

typedef struct {
        uint32_t op;
} node_admin_count_req_t;

typedef struct {
        uint32_t op;
        char name[0];
} node_admin_dropnode_req_t;

typedef struct {
        uint32_t op;
        nid_t nid;
} node_admin_gethost_req_t;

typedef struct {
        uint32_t op;
} node_admin_check_req_t;

typedef struct {
        uint32_t op;
        char name[0];
} node_none_setadmin_req_t;

typedef node_admin_count_req_t node_none_getinfo_req_t;

typedef struct {
        uint32_t op;
        uint32_t idx;
} node_none_castoff_req_t;

typedef struct {
        uint32_t op;
        char pool[MAX_NAME_LEN];
} node_none_pooldrop_req_t;

typedef struct {
        uint32_t op;
        chkid_t chkid;
} node_none_balance_req_t;

typedef struct {
        uint32_t op;
        char name[MAX_NAME_LEN];
        char decree[MAX_NAME_LEN];
        uint64_t decreeid;
        uint64_t version;
        char buf[0];
} node_normal_setmeta_req_t;

typedef struct {
        uint32_t op;
        char name[MAX_NAME_LEN];
        char decree[MAX_NAME_LEN];
        uint64_t decreeid;
        uint64_t version;
        char buf[0];
} node_meta_drop_req_t;

typedef struct {
        uint32_t op;
        uint32_t buflen;
        char buf[0];
} node_meta_push_req_t;

typedef struct {
	uint32_t op;
	char key[MAX_NAME_LEN];
	char value[MAX_NAME_LEN];
	char buf[0];
} node_set_variable_t;

typedef struct {
        uint32_t op;
        char key[MAX_NAME_LEN];
        char buf[0];
}node_fetch_variable_t;

static node_op_handler __handler[NODE_OP_MAX];

static int __node_handler_admin_join(void *_rep, uint32_t *_replen, const void *_req)
{
        int ret;
        const node_admin_add_req_t *req = _req;

        (void) _rep;
        *_replen = 0;

        YASSERT(req->op == NODE_OP_ADMIN_JOIN);

        ret = node_srv_admin_join(req->name);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

static int __node_handler_admin_gethost(void *_rep, uint32_t *_replen, const void *_req)
{
        int ret;
        node_admin_gethost_req_t *req;

        req = (void *)_req;

        YASSERT(req->op == NODE_OP_ADMIN_GETHOST);

        ret = netable_gethost(&req->nid, _rep);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        *_replen = strlen(_rep) + 1;

        return 0;
err_ret:
        return ret;
}

static int __node_handler_admin_getlist_open(void *_rep, uint32_t *_replen, const void *_req)
{
        int ret;
        node_admin_getlist_req_t *req;

        (void) _rep;
        *_replen = 0;

        req = (void *)_req;

        YASSERT(req->op == NODE_OP_ADMIN_GETLIST_OPEN);

        ret = nodetable_list_open(req->uuid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

static int __node_handler_admin_getlist(void *_rep, uint32_t *_replen, const void *_req)
{
        int ret, len;
        node_admin_getlist_req_t *req;

        req = (void *)_req;

        YASSERT(req->op == NODE_OP_ADMIN_GETLIST);

        len = MAX_BUF_LEN;
        ret = nodetable_list(_rep, &len, req->uuid, req->offset);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        *_replen = len;

        return 0;
err_ret:
        return ret;
}

static int __node_handler_admin_getlist_close(void *_rep, uint32_t *_replen, const void *_req)
{
        int ret;
        node_admin_getlist_req_t *req;

        (void) _rep;
        *_replen = 0;

        req = (void *)_req;

        YASSERT(req->op == NODE_OP_ADMIN_GETLIST_CLOSE);

        ret = nodetable_list_close(req->uuid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

static int __node_handler_admin_count(void *_rep, uint32_t *_replen, const void *_req)
{
        int ret, count;
        node_admin_count_req_t *req;

        (void) _req;

        req = (void *)_req;
        YASSERT(req->op == NODE_OP_ADMIN_COUNT);

        ret = nodetable_count(&count);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        *(int *)_rep = count;
        *_replen = sizeof(count);

        return 0;
err_ret:
        return ret;
}

static int __node_handler_admin_check(void *_rep, uint32_t *_replen, const void *_req)
{
        int isadmin;
        node_admin_check_req_t *req;

        (void) _req;

        req = (void *)_req;
        YASSERT(req->op == NODE_OP_ADMIN_CHECK);

        isadmin = node_srv_admin_check();
        
        *(int *)_rep = isadmin;
        *_replen = sizeof(isadmin);

        return 0;
}

static int __node_handler_admin_dropnode(void *_rep, uint32_t *_replen, const void *_req)
{
        int ret;
        const node_admin_dropnode_req_t *req = _req;

        (void) _rep;
        *_replen = 0;

        YASSERT(req->op == NODE_OP_ADMIN_DROPNODE);

        ret = nodetable_del(req->name);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

static int __node_handler_none_getinfo(void *_rep, uint32_t *_replen, const void *_req)
{
        int ret;
        char buf[MAX_BUF_LEN], tmp[MAX_BUF_LEN];
        nodeinfo_t *info;

        (void) _req;

        info = (void *)buf;
        ret = node_srv_getinfo(info, tmp);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        nodeinfo_encode(_rep, _replen, info);

        return 0;
err_ret:
        return ret;
}

static int __node_handler_none_castoff(void *_rep, uint32_t *_replen, const void *_req)
{
        int ret;
        int idx;

        (void) _req;
        (void) _rep;
        *_replen = 0;

        const node_none_castoff_req_t *req = _req;

        idx = req->idx;
        ret = node_srv_none_castoff(idx);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

static int __node_handler_none_pooldrop(void *_rep, uint32_t *_replen, const void *_req)
{
        int ret;

        (void) _req;
        (void) _rep;
        *_replen = 0;

        const node_none_pooldrop_req_t *req = _req;

        ret = node_srv_none_pooldrop(req->pool);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

static int __node_handler_set_variable(void *_rep, uint32_t *_replen, const void *_req)
{
	int ret, type;
	const node_set_variable_t *req = _req;	

	YASSERT(NODE_OP_SET_VARIABLE == req->op);
        
        (void) _rep;
        *_replen = 0;

        /*now , only performance_analysis can be set ! */
        if (strcmp(req->key, "performance_analysis")) {
                ret = EINVAL;
                GOTO(err_ret, ret);
        }

        ret = var_get_type(req->key, &type);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

	ret = var_set(req->key, req->value, type);
	if (unlikely(ret)) {
                GOTO(err_ret, ret);
	}

	DINFO_PERF("set variable %s = %s variable type %d\n", req->key, req->value, type);
        
        return 0;
err_ret:
        return ret;
}

static int __node_handler_fetch_variable(void *_rep, uint32_t *_replen, const void *_req)
{
        int ret;
        const node_fetch_variable_t *req = _req;

        YASSERT(NODE_OP_FETCH_VARIABLE == req->op);

        ret = var_get(req->key, (char *)_rep);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        } else {
                *_replen = strlen((char *)_rep) + 1;
        }
	
        DINFO_PERF("fetch variable %s = %s\n", req->key, (char *)_rep);
        
        return 0;
err_ret:
        return ret;
}

static int __node_handler(void *context, void *rep, uint32_t *replen,
                                       const void *req, uint32_t reqlen)
{
        int ret;
        const uint32_t *op = req;
        node_op_handler handler;

        (void) reqlen;
        (void) context;

        handler = __handler[*op];
        if (handler == NULL) {
                ret = EINVAL;
                DWARN("invalid op %u\n", *op);
                GOTO(err_ret, ret);
        } else {
                ret = handler(rep, replen, req);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int node_rpc_init()
{
        DINFO("node rpc init\n");
        /**********/
        __handler[NODE_OP_ADMIN_JOIN] = __node_handler_admin_join;
        __handler[NODE_OP_ADMIN_GETHOST] = __node_handler_admin_gethost;
        __handler[NODE_OP_ADMIN_GETLIST_OPEN] = __node_handler_admin_getlist_open;
        __handler[NODE_OP_ADMIN_GETLIST] = __node_handler_admin_getlist;
        __handler[NODE_OP_ADMIN_GETLIST_CLOSE] = __node_handler_admin_getlist_close;
        __handler[NODE_OP_ADMIN_COUNT] = __node_handler_admin_count;
        __handler[NODE_OP_ADMIN_DROPNODE] = __node_handler_admin_dropnode;
        __handler[NODE_OP_ADMIN_CHECK] = __node_handler_admin_check;

        /**********/
        __handler[NODE_OP_NONE_GETINFO] = __node_handler_none_getinfo;
        __handler[NODE_OP_NONE_CASTOFF] = __node_handler_none_castoff;
        __handler[NODE_OP_NONE_POOLDROP] = __node_handler_none_pooldrop;
        __handler[NODE_OP_SET_VARIABLE] = __node_handler_set_variable;
        __handler[NODE_OP_FETCH_VARIABLE] = __node_handler_fetch_variable;
        
        minirpc_request_register(MINIRPC_OP_NODE, __node_handler, NULL);

        return 0;
}

int node_rpc_admin_join(const char *master, const char *name)
{
        int ret, replen, reqlen;
        node_admin_add_req_t *req;
        char buf[MAX_BUF_LEN], _rep[MAX_BUF_LEN];

        req = (void *)buf;

        req->op = NODE_OP_ADMIN_JOIN;
        strcpy(req->name, name);
        reqlen = strlen(name) + 1 + sizeof(*req);

retry:
        ret = minirpc_request_wait(MINIRPC_OP_NODE, _rep, &replen,
                                   (void *)req, reqlen, master, 10);
        if (unlikely(ret)) {
                if (ret == EAGAIN) {
                        DBUG("admin not ready\n");
                        usleep(100);
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int node_rpc_set_variable(const char *hostname, const char *key, const char *value)
{
        int ret, replen, reqlen;
        node_set_variable_t *req;      
        char buf[MAX_BUF_LEN], _rep[MAX_BUF_LEN];

        req = (void *)buf;
        req->op = NODE_OP_SET_VARIABLE;
        strcpy(req->key, key);
        strcpy(req->value, value);
        reqlen = sizeof(*req);

        ret = minirpc_request_wait(MINIRPC_OP_NODE, _rep, &replen,
                                   (void *)req, reqlen, hostname, 10);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int node_rpc_fetch_variable(const char *hostname, const char *key, char *value)
{
        int ret, replen, reqlen;
        node_fetch_variable_t *req;      
        char buf[MAX_BUF_LEN], _rep[MAX_BUF_LEN];
        
        req = (void *)buf;
        req->op = NODE_OP_FETCH_VARIABLE;
        strcpy(req->key, key);
        reqlen = sizeof(*req);

        ret = minirpc_request_wait(MINIRPC_OP_NODE, _rep, &replen,
                                   (void *)req, reqlen, hostname, 10);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        strcpy(value, _rep);

        return 0;
err_ret:
        return ret;
}

int node_rpc_admin_gethost(const char *admin, const nid_t *nid, char *host)
{
        int ret, replen;
        node_admin_gethost_req_t req;

        req.op = NODE_OP_ADMIN_GETHOST;
        req.nid = *nid;

        replen = MAX_BUF_LEN;
        ret = minirpc_request_wait(MINIRPC_OP_NODE, (void *)host, &replen,
                                   (void *)&req, sizeof(req), admin, 10);
        if (unlikely(ret)) {
                DWARN("%s ret (%u) %s\n", admin, ret, strerror(ret));
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int node_rpc_admin_getlist_open(const char *admin, const char *uuid)
{
        int ret, replen;
        node_admin_getlist_req_t *req;
        char rep[MAX_BUF_LEN] = {}, buf[MAX_BUF_LEN] = {};

        req = (void *)buf;
        req->op = NODE_OP_ADMIN_GETLIST_OPEN;
        strcpy(req->uuid, uuid);

        replen = MAX_BUF_LEN;
        ret = minirpc_request_wait(MINIRPC_OP_NODE, rep, &replen,
                                   (void *)req, sizeof(*req), admin, 10);
        if (unlikely(ret)) {
                DBUG("%s ret (%u) %s\n", admin, ret, strerror(ret));
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int node_rpc_admin_getlist(const char *admin, char *buf, int *len, const char *uuid, uint64_t offset)
{
        int ret, replen;
        node_admin_getlist_req_t *req;

        req = (void *)buf;
        req->op = NODE_OP_ADMIN_GETLIST;
        req->offset = offset;
        strcpy(req->uuid, uuid);

        replen = MAX_BUF_LEN;
        ret = minirpc_request_wait(MINIRPC_OP_NODE, (void *)buf, &replen,
                                   (void *)req, sizeof(*req), admin, 10);
        if (unlikely(ret)) {
                DWARN("%s ret (%u) %s\n", admin, ret, strerror(ret));
                GOTO(err_ret, ret);
        }

        *len = replen;

        return 0;
err_ret:
        return ret;
}

int node_rpc_admin_getlist_close(const char *admin, const char *uuid)
{
        int ret, replen;
        node_admin_getlist_req_t *req;
        char rep[MAX_BUF_LEN] = {}, buf[MAX_BUF_LEN] = {};

        req = (void *)buf;
        req->op = NODE_OP_ADMIN_GETLIST_CLOSE;
        strcpy(req->uuid, uuid);

        replen = MAX_BUF_LEN;
        ret = minirpc_request_wait(MINIRPC_OP_NODE, rep, &replen,
                                   (void *)req, sizeof(*req), admin, 10);
        if (unlikely(ret)) {
                DWARN("%s ret (%u) %s\n", admin, ret, strerror(ret));
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int node_rpc_admin_count(const char *admin, int *_count)
{
        int ret, count, replen;
        node_admin_count_req_t req;

        req.op = NODE_OP_ADMIN_COUNT;

        replen = sizeof(count);
        ret = minirpc_request_wait(MINIRPC_OP_NODE, (void *)&count, &replen,
                                   (void *)&req, sizeof(req), admin, 10);
        if (unlikely(ret)) {
                DWARN("%s ret (%u) %s\n", admin, ret, strerror(ret));
                GOTO(err_ret, ret);
        }

        *_count = count;

        return 0;
err_ret:
        return ret;
}

int node_rpc_admin_dropnode(const char *admin, const char *name)
{
        int ret, replen, reqlen;
        node_admin_dropnode_req_t *req;
        char buf[MAX_BUF_LEN], _rep[MAX_BUF_LEN];

        req = (void *)buf;

        req->op = NODE_OP_ADMIN_DROPNODE;
        strcpy(req->name, name);
        reqlen = strlen(name) + 1 + sizeof(*req);

        ret = minirpc_request_wait(MINIRPC_OP_NODE, _rep, &replen,
                                   (void *)req, reqlen, admin, 10);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int node_rpc_none_getinfo(char *info, int *buflen, const char *name, int timeout)
{
        int ret;
        node_none_getinfo_req_t req;

        req.op = NODE_OP_NONE_GETINFO;

        ret = minirpc_request_wait(MINIRPC_OP_NODE, (void *)info, buflen,
                                   (void *)&req, sizeof(req), name, timeout);
        if (unlikely(ret)) {
                if (ret == ENONET)
                        goto err_ret;
                else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int node_rpc_none_castoff(const char *name, int idx)
{
        int ret, replen;
        node_none_castoff_req_t *req;
        char buf[BUF_LEN], rep[BUF_LEN * 2];

        req = (void *)buf;
        req->op = NODE_OP_NONE_CASTOFF;
        req->idx = idx;

        ret = minirpc_request_wait(MINIRPC_OP_NODE, (void *)rep, &replen,
                                   (void *)req, sizeof(*req), name, 10);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int node_rpc_none_pooldrop(const char *name, const char *pool)
{
        int ret, replen;
        node_none_pooldrop_req_t *req;
        char buf[BUF_LEN], rep[BUF_LEN * 2];

        req = (void *)buf;
        req->op = NODE_OP_NONE_POOLDROP;
        strcpy(req->pool, pool);

        ret = minirpc_request_wait(MINIRPC_OP_NODE, (void *)rep, &replen,
                                   (void *)req, sizeof(*req), name, 10);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int node_rpc_admin_check(const char *admin, int *_ismaster)
{
        int ret, ismaster, replen;
        node_admin_check_req_t req;

        req.op = NODE_OP_ADMIN_CHECK;

        ANALYSIS_BEGIN(0);
        
        replen = sizeof(ismaster);
        ret = minirpc_request_wait(MINIRPC_OP_NODE, (void *)&ismaster, &replen,
                                   (void *)&req, sizeof(req), admin, 1);
        if (unlikely(ret)) {
                DWARN("%s ret (%u) %s\n", admin, ret, strerror(ret));
                GOTO(err_ret, ret);
        }

        ANALYSIS_END(0, 1000 * 500, NULL);
        DINFO("node %s is %s\n", admin, ismaster ? "admin" : "normal");
        *_ismaster = ismaster;

        return 0;
err_ret:
        return ret;
}
